package com.paic.common.middleware.util

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}

object SparkDataToHbase {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf();
    val sparkContext = new SparkContext(sparkConf);
    val rawData = List(("lilei", "14"))
    val localDatass = sparkContext.parallelize(rawData).map(x => rddToImmutableBytesWritable(x, "rowKey", "columnfaily", "qualify"))
    localDatass.saveAsHadoopDataset(getHbaseConfiguration("tableName"))

  }
  /**
   * get HbaseConfiguration with tableName
   */
  def getHbaseConfiguration(tableName: String): JobConf = {
    val conf = HBaseConfiguration.create()
    val jobConf = new JobConf(conf, this.getClass)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    jobConf
  }
  /**
   * covert rdd to ImmutableBytesWritable
   */
  def rddToImmutableBytesWritable(dataTriple: (String, String), rowKey: String, columnFamily: String, column: String) = {
    val put = new Put(Bytes.toBytes(rowKey))
    put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(dataTriple._1))
    put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(dataTriple._2))
    (new ImmutableBytesWritable, put)
  }

  def rddToImmutableBytesWritables(rowKey: String, columnFamily: String,dataTriple:(String,String)) = {
    val put = new Put(Bytes.toBytes(rowKey))
    put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(dataTriple._1), Bytes.toBytes(dataTriple._2))
    (new ImmutableBytesWritable, put)
  }
}